LakeSoul Flink Connector
该功能于 2.3.0 版本起提供。
2.3.0 版本适配的是 Flink 1.14,2.4.0 版本起升级到了 Flink 1.17。
LakeSoul 提供了 Flink Connector,实 现了 Flink Dynamic Table 接口,可以使用 Flink 的 DataStream API, Table API 或 SQL 来执行对 LakeSoul 数据的读写,读和写均支持流式和批式两种模式。在 Flink 流式读、写时君支持 Flink Changelog Stream 语义。
1. 环境准备
设置 LakeSoul 元数据,请参考 设置 Spark/Flink 工程/作业
设置全局的 Warehouse 路径:可以在 $FLINK_HOME/conf/flink-conf.yaml
中指定:
flink.warehouse.dir: "s3://bucket/path"
如果指定了 warehouse 路径,则表路径默认为 warehouse_dir/table_name
。如果建表时在属性中指定了 path
属性,则优先使用该属性作为表的存储路径。
Flink 引入 LakeSoul 依赖的方法:下載 lakesoul-flink-1.17-2.6.2.jar,放入 $FLINK_HOME/lib
,或在启动时指定 jar 的路径。
为了使用 Flink 创建 LakeSoul 表,推荐使用 Flink SQL Client,支持直接使用 Flink SQL 命令操作 LakeSoul 表,本文档中 Flink SQL 是在 Flink SQL Client 界面直接输入语句;Table API 需要在 Java 项目中编写使用。
切换到 Flink 文件夹下,执行命令开启 SQL Client 客户端。
# 启动 Flink SQL Client
bin/sql-client.sh embedded -j lakesoul-flink-1.17-2.6.2.jar
2. DDL
2.1 创建catalog
创建 LakeSoul 类型的 catalog,指定 catalog 类型为 LakeSoul。指定 LakeSoul 的 database,默认为 default
- Flink SQL
create catalog lakesoul with('type'='lakesoul');
use catalog lakesoul;
show databases;
use `default`;
show tables; - Table API
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
Catalog lakesoulCatalog = new LakeSoulCatalog();
tEnv.registerCatalog("lakeSoul", lakesoulCatalog);
tEnv.useCatalog("lakeSoul");
tEnv.useDatabase("default");
2.2 建表
LakeSoul 支持通过 Flink 创建多种类型的表,包括无主键表、有主键表,以及 CDC 格式表。表可以有多个 Range 分区字段。
建表语句中各个部分参数含义:
参数 | 含义说明 | 参数填写格式 |
---|---|---|
PARTITIONED BY | 用于指定表的 Range 分区字段 ,如果不存在 range 分区字段,则省略 | PARTITIONED BY (date ) |
PRIMARY KEY | 用于指定表的主键,可以包含多个列 | PARIMARY KEY (id , name ) NOT ENFORCED |
connector | 数据源连接器,用于指定数据源类型 | 'connector'='lakesoul' |
hashBucketNum | 有主键表必须设置哈希分片数 | 'hashBucketNum'='4' |
path | 用于指定表的存储路径 | 'path'='file:///tmp/lakesoul/flink/sink/test' |
use_cdc | 设置表是否为 CDC 格式 (参考 CDC 表格式 ) | 'use_cdc'='true' |
LakeSoul 表支持 Flink 的所有常用数据类型,并与 Spark SQL 数据类型一一对应,使 LakeSoul 表能同时支持 Flink 和 Spark 的读写。
- Flink SQL
-- 创建test_table表,以id和name作为联合主键,以region和date作为两级range分区,catalog是lakesoul,database是default
create table `lakesoul`.`default`.test_table (
`id` INT,
name STRING,
score INT,
`date` STRING,
region STRING,
PRIMARY KEY (`id`,`name`) NOT ENFORCED
) PARTITIONED BY (`region`,`date`)
WITH (
'connector'='lakesoul',
'hashBucketNum'='4',
'use_cdc'='true',
'path'='file:///tmp/lakesoul/flink/sink/test');